Get wiki views and scrape wikipedia. Using the hyg dataset for locations. This is a summary notebook without the not-so-interesting bits removed (see projections nb for azimuthal etc.)
import pandas as pd
import numpy as np
import wikitextparser as wtp
import requests, re, csv, pickle, json
from collections import defaultdict
from plotly.offline import iplot, init_notebook_mode
import plotly.graph_objs as go
init_notebook_mode()
import plotly.io as pio
%%javascript
//hack to fix export
require.config({
paths: {
d3: 'https://cdnjs.cloudflare.com/ajax/libs/d3/5.9.2/d3',
jquery: 'https://code.jquery.com/jquery-3.4.1.min',
plotly: 'https://cdn.plot.ly/plotly-latest.min'
},
shim: {
plotly: {
deps: ['d3', 'jquery'],
exports: 'plotly'
}
}
});
See https://blog.matteoferla.com/2019/07/wikipedia-datamining.html
## the class to parse wiki.
class WikicatParser():
"""
Gets all the pages recursively within a category and parser the content (via a suplied function) and gets pageviews.
>>>pages = WikicatParser(cat_name, custom_page_parser=my_function, extra_fields=[], forbidden_categories_keywords=[...]).get_pages_recursively()
>>>print(pages.data)
>>>pandas.DataFrame.from_records(list(pages.data.values()))
custom_page_parser is for content mining. a function that given wiki text returns a dictionary of whatever it mined.
Any extra fields need to be be added to extra_fields or to_csv will fail.
.get_pages_recursively gets everything downwards. Do note that .forbidden_categories_keywords may need to be set.
This calls both .get_pages and .get_subcategories, both of which actually call .get_members which calls get, which is the web fetcher.
.get_pageviews gets the page views.
"""
api = "https://en.wikipedia.org/w/api.php"
def __init__(self, category,
no_views=False,
no_content=False,
custom_page_parser=None,
wanted_templates = None,
extra_fields=None,
forbidden_categories_keywords=None):
self.session = requests.Session()
self.no_views = no_views
self.no_content = no_content
self.data = {}
if 'Category:' not in category:
self.category = 'Category:'+category
else:
self.category = category
self.category_map = {}
self.category_cleaned = category.replace(' ','_').replace('Category:','')
if custom_page_parser:
self.page_parser = custom_page_parser
elif wanted_templates:
self.wanted_templates = wanted_templates
self.page_parser = self.parse_templates
else:
self.no_content = True
self.page_parser = lambda text: {}
if extra_fields:
self.extra_fields = extra_fields
else:
self.extra_fields = []
if forbidden_categories_keywords:
if isinstance(forbidden_categories_keywords, str):
self.forbidden_categories_keywords = [self.forbidden_categories_keywords.lower()]
else:
self.forbidden_categories_keywords = [k.lower() for k in forbidden_categories_keywords]
else:
self.forbidden_categories_keywords = []
def get(self, params):
"""
Fetch data.
"""
data = self.session.get(url=self.api, params=params).json()
if 'continue' in data:
params['cmcontinue'] = data['continue']['cmcontinue']
t = list(data['query'].keys())[0]
new_data = self.get(params)
new_data['query'][t] = [*data['query'][t], *new_data['query'][t]]
data = new_data
return data
def _add_datum(self, data, cat):
for d in data:
name = d["title"]
if name not in self.data:
self.data[name] = d
self.data[name]['category'] = cat
if not self.no_views:
self.data[name]['views'] = self.get_pageviews(name)
if not self.no_content:
wiki = self.get_content(name)
for key, value in self.page_parser(wiki).items():
self.data[name][key] = value
else:
self.data[name]["category"] += '|' + cat
def get_subcategories(self, cat):
subcats = []
for subcat in self.get_members(cat, 'subcat'):
for k in self.forbidden_categories_keywords:
if k in subcat['title'].lower():
print(f'BAN: {subcat["title"]} removed because it contained {k}')
break
else:
subcats.append(subcat)
self.category_map[cat] = [s['title'] for s in subcats]
return subcats
def get_page_by_name(self,name, cat='Manual'):
#gets the page by the name specified! This is a fix!
self._add_datum([{'title': name}],cat)
def get_pages(self,cat):
#gets all the pages within the category
return self.get_members(cat, 'page')
def get_members(self, cat, cmtype='subcat|page'):
params = {
'action': "query",
'list': "categorymembers",
'cmtitle': cat,
'cmtype': cmtype,
'cmdir': "desc",
'format': "json"
}
r = self.get(params)
if 'query' not in r:
print(f'{cat} replied with {str(r)}.')
return []
data = r['query']['categorymembers']
self._add_datum(data, cat)
return data
def get_pages_recursively(self, cat=None):
if cat is None:
cat = self.category
subcats = [s['title'] for s in self.get_subcategories(cat)]
data = self.get_pages(cat)
for c in subcats:
ndata = self.get_pages_recursively(c)
print(f'{c} has {len(data)} pages directly and {len(ndata)} in subcategories')
data.extend(ndata)
return data
def get_pageviews(self, page):
url = f"https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/en.wikipedia/all-access/all-agents/{page.replace(' ','_').replace('/','%2F')}/monthly/2018060100/2019060100"
r = self.session.get(url).json()
if 'items' in r:
return sum([i['views'] for i in r['items']])/365
else:
print('error',page, r)
return 'NA'
def get_content(self,page):
params = {
'action': "query",
'prop': 'revisions',
'rvprop': 'content',
'rvsection': 0,
'titles': page,
'format': "json"
}
data = self.session.get(url=self.api, params=params).json()
pageid = list(data['query']['pages'].keys())[0]
wikimarkup = data['query']['pages'][pageid]['revisions'][0]['*']
return wikimarkup.encode('utf-8','ignore').decode('unicode_escape','ignore') #not quite right
def to_csv(self):
"""Don't save as csv for storage. Save as pickle. This is just for causal inspection in Excel."""
with open(f'{self.category_cleaned}.csv','w',newline='') as w:
dw = csv.DictWriter(w,['title','category', 'ns','views','pageid']+self.extra_fields,
extrasaction='ignore')
dw.writeheader()
dw.writerows(self.data.values())
return self
####### code to convert template to dictionary
def parse_templates(self, text):
dex = {}
for t in wtp.parse(text).templates:
for want in self.wanted_templates:
if want.lower() in t.normal_name().lower(): # not using t.name has training space.
dex ={**dex, **self._template_to_dict(t)}
return dex
def _arg_to_val(self, arg):
val = arg.value
for t in arg.templates:
if t.arguments:
tval = t.arguments[0].value
if t.normal_name() in ('nowrap', 'val'):
if any(['ul' in a.name for a in t.arguments]): #unit!
tval += [a.value for a in t.arguments if 'u' in a.name][0] #u= and ul=
val = val.replace(t.string, tval)
val = re.sub('<.*?\/>','',val) #remove self closing tags
val = val.replace(' ',' ')
val = re.sub('<.*?>.*?<\/.*?>','',val) # remove tags
val = re.sub('<!--.*?-->','',val) # remove comments
val = val.replace('–','-') # en dash to hyphen minus
val = val.replace('–','-') # em dash to hyphen minus
val = re.sub('±\s+\d+\.?\d*','', val) #clear error for safety
val = val.rstrip().lstrip()
return val
def _arg_to_key(self, arg):
return arg.name.rstrip().lstrip()
def _template_to_dict(self, template):
return {self._arg_to_key(arg): self._arg_to_val(arg) for arg in template.arguments}
# get the data.
stars = WikicatParser('Category:Stars by luminosity class',wanted_templates=['Starbox'],
forbidden_categories_keywords=['Sun'])
stars.get_pages_recursively()
## save before sending to pd as something might get munted.
pickle.dump(stars.data, open('wiki_stars.dict.p','wb'))
json.dump(stars.data, open('wiki_stars.json','w'))
I don't trust 100% the numbers on wiki as they are written by humans. Take Procyon... which had a wrong HID and HD.
### file downloaded online from
# http://www.astronexus.com/files/downloads/hygdata_v3.csv.gz
# field defs are there.
hyg = pd.read_csv('hygdata_v3.csv')
hyg = hyg.drop([0],axis=0) # dim the sun
hyg = hyg.loc[~hyg.ra.isna()]
def get_name(row):
if isinstance(row.proper, str):
return row.proper
elif isinstance(row.bf, str):
return row.bf
elif isinstance(row.hd, float) and row.hd > 0:
return 'HD '+str(int(row.hd))
elif isinstance(row.gl, float) and row.gl > 0:
return f'Gliese{row.gl}'
elif isinstance(row.hip, float) and row.hip > 0:
return f'HIP{row.hip}'
else:
return '???'
hyg = hyg.assign(az_rho=hyg.dec.apply(lambda x: np.pi/2 -x))\
.assign(az_theta=hyg.ra.apply(lambda x: x/24*360 if x > 0 else 360 + x/24*360))\
.assign(named = hyg.apply(get_name, axis=1).astype(str))
hyg = hyg.assign(s= 1/(2.5**hyg.mag))
import json, re
wiki = json.load(open('wiki_stars.json','r')).values()
whd = {}
for star in wiki:
r = re.search('HD.*?(\d+)',str(star).replace('\n',''))
if r:
hd = int(r.group(1))
if star['title'] == 'Procyon':
whd[61421] = star['views']
elif hd in whd:
print(f'Duplication for HD {hd} —{star["title"]}')
else:
whd[hd] = star['views']
def get_views(hd):
try:
hd = int(hd)
if hd in whd:
return float(whd[hd])
else:
return None
except Exception:
return None
hyg = hyg.assign(wikiviews=hyg.hd.apply(get_views))
## fix the broken ones.
hyg.at[hyg.loc[hyg.hip == 94311].index.values[0],'wikiviews'] = 30 #19 Lyr is a new article.
hyg.at[hyg.loc[hyg.hd == 58061].index.values[0],'named'] = 'CY CMa'
hyg.at[hyg.loc[hyg.hd == 168442].index.values[0],'named'] = 'Gliese 710'
hyg.at[hyg.loc[hyg.hd == 140283].index.values[0],'named'] = 'Methuselah star'
hyg.at[hyg.loc[hyg.hd == 304043].index.values[0],'named'] = 'Innes\'s star'
## mag
shyg = hyg.loc[hyg.mag < 7]
m = max(shyg.s)
constellations = [shyg.loc[shyg.con == c] for c in set(shyg.con) if c]
magscatters = [go.Scattergl(
x = shyg.ra,
y = shyg.dec,
text = shyg.named,
#name = c.con.values[0],
mode = 'markers',
marker={'size': shyg.s, 'sizeref': 2.*max(hyg.s)/(30.**2), 'sizemode': 'area', 'color': 'white'}
)]# for c in constellations]
## wiki
shyg = hyg.loc[~hyg.wikiviews.isna()]
shyg = shyg.loc[shyg.wikiviews > 0.1]
#m = np.log2(max(shyg.wikiviews)*10)
m = max(shyg.wikiviews)
#constellations = [shyg.loc[shyg.con == c] for c in set(shyg.con) if c]
wikiscatters = [go.Scattergl(
x = shyg.ra,
y = shyg.dec,
text = shyg.named,
#name = .#c.con.values[0],
mode = 'markers',
marker={'size': shyg.wikiviews.values, #np.log2(c.wikiviews.values*10),
'sizeref': 2*m/(30.**2),
'sizemode': 'area',
'color': 'white'}
)]# for c in constellations if len(c)]
bright = hyg.loc[(hyg.mag < 1.5)]
mag_annotations=[go.layout.Annotation(
x=row.ra,
y=row.dec,
xref="x",
yref="y",
text=row.named,
ax=-20,
ay=-20,
font=dict(
size=10,
color="skyblue"), arrowcolor="skyblue"
) for i, row in bright.iterrows()]
wiki_bright = hyg.loc[(hyg.wikiviews > 200) & (hyg.mag < 1.5)]
wiki_bright_annotations=[go.layout.Annotation(
x=row.ra,
y=row.dec,
xref="x",
yref="y",
text=row.named,
ax=-20,
ay=-20,
font=dict(
size=10,
color="skyblue"), arrowcolor="skyblue"
) for i, row in wiki_bright.iterrows()]
unwiki_bright = hyg.loc[(hyg.wikiviews < 200) & (hyg.mag < 1.5)]
unwiki_bright_annotations=[go.layout.Annotation(
x=row.ra,
y=row.dec,
xref="x",
yref="y",
text=row.named,
ax=-20,
ay=-20,
font=dict(
size=10,
color="lightcoral"),
arrowcolor="lightcoral"
) for i, row in unwiki_bright.iterrows()]
wiki_dim = hyg.loc[(hyg.wikiviews > 100) & (hyg.mag > 1.5)]
wiki_dim_annotations=[go.layout.Annotation(
x=row.ra,
y=row.dec,
xref="x",
yref="y",
text=row.named,
ax=20,
ay=20,
font=dict(
size=10,
color="lime"
), arrowcolor="lime"
) for i, row in wiki_dim.iterrows()]
common = 'Equirectangular projection (plate carrée) of all stars:'
layout = {'title': f'{common} Magnitude',
'showlegend': False,
'yaxis': {'range': [-90, 90]},
'xaxis': {'range': [0, 24]},
'annotations': mag_annotations,
'plot_bgcolor': 'black'
}
#frames = [go.Frame(data=wikiscatters, layout={'title': f'{common} wiki views'})]
fig = go.Figure(data=magscatters, layout = layout)
iplot(fig, image='png', filename='mag.png', image_width=1280, image_height=1280)
pio.write_image(fig, 'mag_label.png')
layout['title'] = f'{common} Wiki daily views'
layout['annotations'] = wiki_bright_annotations + unwiki_bright_annotations + wiki_dim_annotations
fig = go.Figure(data=wikiscatters, layout = layout)
iplot(fig)
pio.write_image(fig, 'wiki_label.png')
northern = hyg.loc[(~hyg.wikiviews.isna()) & (hyg.dec >= 0)]
southern = hyg.loc[(~hyg.wikiviews.isna()) & (hyg.dec < 0)]
fig = go.Figure(data=[go.Scattergl(x=northern.mag, y=np.log2(northern.wikiviews), name='Northern', text=northern.named, mode='markers', opacity=0.2),
go.Scattergl(x=southern.mag, y=np.log2(southern.wikiviews), name='Southern', text=southern.named, mode='markers', opacity=0.2)],
layout={'title': 'Magnitude vs views',
'xaxis': {'title':'Magnitude'},
'yaxis': {'title':'log2 Wikiviews', 'range': [-0.5, 11]}
})
iplot(fig)